24 concurrent.futures并发执行
threading和multiprocessing虽然强大,但用起来有点繁琐——手动创建线程/进程、管理生命周期、收集结果。concurrent.futures提供了更高级的接口,用线程池和进程池简化并发编程。
一、ThreadPoolExecutor:线程池
1.1 基本用法
python
from concurrent.futures import ThreadPoolExecutor
def task(n):
import time
time.sleep(1)
return n * n
# 创建线程池,最多3个线程
with ThreadPoolExecutor(max_workers=3) as executor:
# submit提交任务,返回Future对象
futures = [executor.submit(task, i) for i in range(5)]
# 获取结果
for f in futures:
print(f.result()) # 阻塞直到任务完成1.2 map方法
更简洁的并行映射。
python
from concurrent.futures import ThreadPoolExecutor
def task(n):
import time
time.sleep(1)
return n * n
with ThreadPoolExecutor(max_workers=3) as executor:
# map:并行映射,按顺序返回结果
results = list(executor.map(task, [1, 2, 3, 4, 5]))
print(results) # [1, 4, 9, 16, 25]1.3 as_completed
按完成顺序获取结果。
python
from concurrent.futures import ThreadPoolExecutor, as_completed
def task(n):
import time
import random
time.sleep(random.uniform(0.5, 2))
return n * n
with ThreadPoolExecutor(max_workers=3) as executor:
futures = {executor.submit(task, i): i for i in range(5)}
# as_completed:按完成顺序返回
for future in as_completed(futures):
n = futures[future]
result = future.result()
print(f"任务 {n} 完成: {result}")二、ProcessPoolExecutor:进程池
用法和ThreadPoolExecutor完全一样,只是用进程代替线程。
python
from concurrent.futures import ProcessPoolExecutor
def cpu_intensive(n):
total = 0
for i in range(n):
total += i * i
return total
if __name__ == '__main__':
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(cpu_intensive, [10**6] * 8))
print(results)三、Future对象
3.1 获取结果
python
from concurrent.futures import ThreadPoolExecutor
def task(n):
import time
time.sleep(1)
return n * n
with ThreadPoolExecutor() as executor:
future = executor.submit(task, 5)
# 检查是否完成
print(future.done()) # False
# 获取结果(阻塞)
result = future.result()
print(result) # 25
# 再次检查
print(future.done()) # True3.2 超时
python
from concurrent.futures import ThreadPoolExecutor, TimeoutError
def slow_task():
import time
time.sleep(10)
return "完成"
with ThreadPoolExecutor() as executor:
future = executor.submit(slow_task())
try:
result = future.result(timeout=3) # 3秒超时
except TimeoutError:
print("任务超时!")3.3 回调
python
from concurrent.futures import ThreadPoolExecutor
def task(n):
return n * n
def callback(future):
print(f"任务完成,结果: {future.result()}")
with ThreadPoolExecutor() as executor:
future = executor.submit(task, 5)
future.add_done_callback(callback)3.4 取消任务
python
from concurrent.futures import ThreadPoolExecutor
def task(n):
import time
time.sleep(5)
return n * n
with ThreadPoolExecutor() as executor:
future = executor.submit(task, 5)
# 取消任务(只能取消还未开始的任务)
cancelled = future.cancel()
print(f"取消成功: {cancelled}")四、wait()
更灵活的等待方式。
python
from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED, ALL_COMPLETED
def task(n):
import time
time.sleep(n)
return n
with ThreadPoolExecutor() as executor:
futures = [executor.submit(task, i) for i in [3, 1, 2]]
# FIRST_COMPLETED:有一个完成就返回
done, not_done = wait(futures, return_when=FIRST_COMPLETED)
print(f"已完成: {len(done)}, 未完成: {len(not_done)}")
# ALL_COMPLETED:所有完成才返回
done, not_done = wait(futures, return_when=ALL_COMPLETED)
print(f"全部完成: {len(done)}")return_when参数:
FIRST_COMPLETED:有一个完成就返回FIRST_EXCEPTION:有一个异常就返回ALL_COMPLETED:所有完成才返回(默认)
五、异常处理
5.1 捕获异常
python
from concurrent.futures import ThreadPoolExecutor
def risky_task(n):
if n == 3:
raise ValueError(f"任务 {n} 出错了")
return n * n
with ThreadPoolExecutor() as executor:
futures = [executor.submit(risky_task, i) for i in range(5)]
for i, future in enumerate(futures):
try:
result = future.result()
print(f"任务 {i}: {result}")
except Exception as e:
print(f"任务 {i} 异常: {e}")5.2 return_exceptions
python
from concurrent.futures import ThreadPoolExecutor
def risky_task(n):
if n == 3:
raise ValueError(f"任务 {n} 出错了")
return n * n
with ThreadPoolExecutor() as executor:
# map的异常会直接抛出
try:
results = list(executor.map(risky_task, range(5)))
except ValueError as e:
print(f"异常: {e}")六、实战场景
6.1 并发请求
python
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def fetch_url(url):
time.sleep(1) # 模拟网络请求
return f"{url} 的响应"
urls = [f"http://example.com/api/{i}" for i in range(10)]
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {executor.submit(fetch_url, url): url for url in urls}
for future in as_completed(futures):
url = futures[future]
result = future.result()
print(f"{url}: {result}")6.2 批量文件处理
python
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
def process_file(filepath):
with open(filepath, 'r') as f:
content = f.read()
return f"{filepath.name}: {len(content)} 字符"
files = list(Path("./data").glob("*.txt"))
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(process_file, files))
for r in results:
print(r)6.3 超时控制
python
from concurrent.futures import ThreadPoolExecutor, as_completed, TimeoutError
def task(n):
import time
time.sleep(n)
return n
with ThreadPoolExecutor() as executor:
futures = [executor.submit(task, i) for i in [1, 2, 5, 3]]
try:
for future in as_completed(futures, timeout=3):
print(f"结果: {future.result()}")
except TimeoutError:
print("部分任务超时")七、选择ThreadPoolExecutor还是ProcessPoolExecutor
| 场景 | 用什么 |
|---|---|
| I/O密集型(网络请求、文件读写) | ThreadPoolExecutor |
| CPU密集型(计算、数据处理) | ProcessPoolExecutor |
| 不确定 | ThreadPoolExecutor(更轻量) |
八、总结
concurrent.futures的核心:
| 组件 | 用途 |
|---|---|
ThreadPoolExecutor | 线程池 |
ProcessPoolExecutor | 进程池 |
Future | 异步计算结果 |
as_completed() | 按完成顺序获取 |
wait() | 灵活等待任务 |
使用模式:
python
with ThreadPoolExecutor(max_workers=5) as executor:
# 方式1:submit
futures = [executor.submit(task, arg) for arg in args]
for f in futures:
print(f.result())
# 方式2:map
results = list(executor.map(task, args))concurrent.futures是Python并发编程的首选方式,比直接用threading和multiprocessing简单得多。